package com.think.cloud.thinkshop.mall.rabbit.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;


@Configuration
@Slf4j
public class RabbitConfig {


    public final static String QUEUE_NAME = "think_shop_queue";
    public final static String EXCHANGE_NAME = "think_shop_exchange";
    public final static String ROUTING_KEY_NAME = "think_shop_routing_Key";
    public final static String DELAY_QUEUE_NAME = "think_shop_delay_queue";
    public final static String DELAY_EXCHANGE_NAME = "think_shop_delay_exchange";
    public final static String DELAY_ROUTING_KEY_NAME = "think_shop_delay_routing_Key";


    /**
     * 指定bean name 避免bean重名导致重复绑定使队列失效
     */
    public final static String QUEUE_BEAN = "myQueue";
    public final static String EXCHANGE_BEAN = "exchange";
    public final static String DELAY_QUEUE_BEAN = "delayQueue";
    public final static String DELAY_EXCHANGE_BEAN = "delayExchange";

    @Resource
    private RabbitProperties properties;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(properties.getHost(), properties.getPort());
        connectionFactory.setUsername(properties.getUsername());
        connectionFactory.setPassword(properties.getPassword());
        connectionFactory.setVirtualHost(properties.getVirtualHost());
        connectionFactory.setPublisherConfirms(properties.isPublisherConfirms()); // 发送消息回调,必须要设置
        connectionFactory.setPublisherReturns(properties.isPublisherReturns());
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println(ack ? "消息成功到达交换器" : "消息未达交换器: " + cause);
        });
        rabbitTemplate.setMandatory(true); // 设置消息必须到达队列，否则会触发ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("消息未达队列: " + replyCode + " - " + replyText);
        });
        return rabbitTemplate;
    }

    @Bean(QUEUE_BEAN)
    public Queue myQueue() {
        return new Queue(QUEUE_NAME, true); // true 表示队列持久化
    }

    @Bean(EXCHANGE_BEAN)
    DirectExchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    Binding binding(@Qualifier(QUEUE_BEAN) Queue myQueue, @Qualifier(EXCHANGE_BEAN) DirectExchange exchange) {
        return BindingBuilder.bind(myQueue).to(exchange).with(ROUTING_KEY_NAME);
    }


    @Bean(name = DELAY_EXCHANGE_BEAN)
    CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct"); // 或者 "topic", "fanout", "headers" 根据你的需求
        return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean(name = DELAY_QUEUE_BEAN)
    Queue delayQueue() {
        return new Queue(DELAY_QUEUE_NAME, true);
    }

    @Bean
    Binding delayBinding(@Qualifier(DELAY_EXCHANGE_BEAN) CustomExchange delayExchange, @Qualifier(DELAY_QUEUE_BEAN) Queue delayQueue) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY_NAME).noargs();
    }


}
